View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.persistence;
44  
45  import java.sql.Connection;
46  import java.sql.PreparedStatement;
47  import java.sql.ResultSet;
48  import java.sql.SQLException;
49  import java.util.Vector;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  
54  import org.exolab.jms.client.JmsDestination;
55  import org.exolab.jms.client.JmsTopic;
56  import org.exolab.jms.messagemgr.PersistentMessageHandle;
57  import org.exolab.jms.messagemgr.MessageHandle;
58  
59  
60  /***
61   * This class provides persistency for MessageHandle objects
62   * in an RDBMS database
63   *
64   * @version     $Revision: 1.4 $ $Date: 2005/08/31 05:45:50 $
65   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
66   */
67  class MessageHandles {
68  
69      /***
70       * The destination manager.
71       */
72      private final Destinations _destinations;
73  
74      /***
75       * The consumer manager.
76       */
77      private final Consumers _consumers;
78  
79      /***
80       * prepared statement for inserting a message handle
81       */
82      private static final String INSERT_MSG_HANDLE_STMT =
83              "insert into message_handles (messageid, destinationid, consumerid, "
84              + "priority, acceptedtime, sequencenumber, expirytime, delivered) "
85              + "values (?,?,?,?,?,?,?,?)";
86  
87      /***
88       * prepared statements for deleting message handle
89       */
90      private static final String DELETE_MSG_HANDLE_STMT1 =
91          "delete from message_handles where messageId=? and consumerId=?";
92      private static final String DELETE_MSG_HANDLE_STMT2 =
93          "delete from message_handles where messageId=? and destinationId=? " +
94          "and consumerId=?";
95  
96      /***
97       * Delete all message handles with the specified message id
98       */
99      private static final String DELETE_MSG_HANDLES_STMT =
100         "delete from message_handles where messageId=?";
101 
102     /***
103      * Update a row in the message handles table
104      */
105     private static final String UPDATE_MSG_HANDLE_STMT =
106         "update message_handles set delivered=? where messageId=? and " +
107         "destinationId=? and consumerId=?";
108 
109     /***
110      * Delete all message handles for a destination
111      */
112     private static final String DELETE_MSG_HANDLES_FOR_DEST =
113         "delete from message_handles where destinationId=?";
114 
115     /***
116      * Retrieve all message handles for a particular consumer
117      */
118     private static final String GET_MSG_HANDLES_FOR_DEST =
119         "select messageid, destinationid, consumerid, priority, acceptedtime, "
120         + "sequencenumber, expirytime, delivered from message_handles "
121         + "where consumerId=? order by acceptedTime asc";
122 
123     /***
124      * Retrieve a range of message handles between the specified times
125      */
126     private static final String GET_MESSAGE_HANDLES_IN_RANGE =
127         "select distinct messageId from message_handles where " +
128         " acceptedTime >= ? and acceptedTime <=?";
129 
130     /***
131      * Retrieve a handle with the specified id
132      */
133     private static final String GET_MESSAGE_HANDLE_WITH_ID =
134         "select distinct messageId from message_handles where messageId=?";
135 
136     /***
137      * Return the number of messages and a specified destination and cousmer
138      */
139     private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
140         "select count(messageId) from message_handles where destinationId=? " +
141         "and consumerId=?";
142 
143     /***
144      * Return the number of messages and a specified consumer
145      */
146     private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
147         "select count(messageId) from message_handles where consumerId=?";
148 
149     /***
150      * Delete all expired messages
151      */
152     private static final String DELETE_EXPIRED_MESSAGES =
153         "delete from message_handles where consumerId=? and expiryTime != 0 " +
154         "and expiryTime<?";
155 
156     /***
157      * The logger.
158      */
159     private static final Log _log = LogFactory.getLog(MessageHandles.class);
160 
161 
162     /***
163      * Construct a new <code>MessageHandles</code>.
164      *
165      * @param destinations the destinations manager
166      * @param consumers the consumers manager
167      */
168     public MessageHandles(Destinations destinations, Consumers consumers) {
169         _destinations = destinations;
170         _consumers = consumers;
171     }
172 
173     /***
174      * Add the specified message handle to the database.
175      *
176      * @param connection - the connection to use
177      * @param handle - message handle to add
178      * @throws PersistenceException - if add does not complete
179      */
180     public void addMessageHandle(Connection connection,
181                                  MessageHandle handle)
182         throws PersistenceException {
183 
184         if (_log.isDebugEnabled()) {
185             _log.debug("addMessageHandle(handle=[consumer="
186                        + handle.getConsumerPersistentId()
187                        + ", destination=" + handle.getDestination() 
188                        + ", id=" + handle.getMessageId() + "])");
189         }
190 
191         PreparedStatement insert = null;
192         try {
193             // map the destination name to an actual identity
194             long destinationId = _destinations.getId(
195                 handle.getDestination().getName());
196             if (destinationId == 0) {
197                 throw new PersistenceException(
198                     "Cannot add message handle id=" + handle.getMessageId() +
199                     " for destination=" + handle.getDestination().getName() +
200                     " and consumer=" + handle.getConsumerPersistentId() +
201                     " since the destination cannot be mapped to an id");
202             }
203 
204             // map the consumer name ot an identity
205             long consumerId = _consumers.getConsumerId(
206                 handle.getConsumerPersistentId());
207             if (consumerId == 0) {
208                 throw new PersistenceException(
209                     "Cannot add message handle id=" + handle.getMessageId() +
210                     " for destination=" + handle.getDestination().getName() +
211                     " and consumer=" + handle.getConsumerPersistentId() +
212                     " since the consumer cannot be mapped to an id");
213             }
214 
215             insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
216             insert.setString(1, handle.getMessageId());
217             insert.setLong(2, destinationId);
218             insert.setLong(3, consumerId);
219             insert.setInt(4, handle.getPriority());
220             insert.setLong(5, handle.getAcceptedTime());
221             insert.setLong(6, handle.getSequenceNumber());
222             insert.setLong(7, handle.getExpiryTime());
223             insert.setInt(8, (handle.getDelivered()) ? 1 : 0);
224 
225             // execute the insert
226             if (insert.executeUpdate() != 1) {
227                 _log.error(
228                     "Failed to execute addMessageHandle for handle="
229                     + handle.getMessageId() + ", destination Id="
230                     + destinationId);
231             }
232         } catch (SQLException exception) {
233             throw new PersistenceException("Failed to add message handle=" +
234                 handle, exception);
235         } finally {
236             SQLHelper.close(insert);
237         }
238     }
239 
240     /***
241      * Remove the specified message handle from the database. Once the handle
242      * has been removed check to see whether there are any more message handles
243      * referencing the same message. If there are not then remove the
244      * corresponding message from the messages tables.
245      *
246      * @param connection - the connection to use
247      * @param  handle - the handle to remove
248      * @throws  PersistenceException - sql releated exception
249      */
250     public void removeMessageHandle(Connection connection,
251                                     MessageHandle handle)
252         throws PersistenceException {
253 
254         if (_log.isDebugEnabled()) {
255             _log.debug("removeMessageHandle(handle=[consumer="
256                        + handle.getConsumerPersistentId()
257                        + ", destination=" + handle.getDestination() 
258                        + ", id=" + handle.getMessageId() + "])");
259         }
260 
261         PreparedStatement delete = null;
262         PreparedStatement select = null;
263         ResultSet rs = null;
264 
265         try {
266             // first check to see that the consumer exists and only
267             // proceed if it non-zero.
268             long consumerId = _consumers.getConsumerId(
269                 handle.getConsumerPersistentId());
270             if (consumerId != 0) {
271                 // get the message id
272                 String id = handle.getMessageId();
273 
274                 // map the destination name to an actual identity. If it is
275                 // null then the destination does not currently exist but we
276                 // may need to delete orphaned handles
277                 long destinationId = _destinations.getId(
278                     handle.getDestination().getName());
279 
280                 if (destinationId == 0) {
281                     delete = connection.prepareStatement(
282                         DELETE_MSG_HANDLE_STMT1);
283                     delete.setString(1, id);
284                     delete.setLong(2, consumerId);
285 
286                 } else {
287                     delete = connection.prepareStatement(
288                         DELETE_MSG_HANDLE_STMT2);
289                     delete.setString(1, id);
290                     delete.setLong(2, destinationId);
291                     delete.setLong(3, consumerId);
292                 }
293 
294                 // execute the delete
295                 if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
296                     // only log if the message hasn't been garbage
297                     // collected
298                     _log.error("Failed to execute removeMessageHandle for "
299                         + "handle=" + id + " destination id="
300                         + destinationId + " consumer id=" + consumerId);
301                 }
302             }
303         } catch (SQLException exception) {
304             throw new PersistenceException("Failed to remove message handle=" +
305                 handle, exception);
306         } finally {
307             SQLHelper.close(rs);
308             SQLHelper.close(delete);
309             SQLHelper.close(select);
310         }
311     }
312 
313     /***
314      * Update the specified message handle from the database
315      *
316      * @param connection - the connection to use
317      * @param  handle - the handle to update
318      * @throws  PersistenceException - sql releated exception
319      */
320     public void updateMessageHandle(Connection connection,
321                                     MessageHandle handle)
322         throws PersistenceException {
323         PreparedStatement update = null;
324 
325         if (_log.isDebugEnabled()) {
326             _log.debug("updateMessageHandle(handle=[consumer="
327                        + handle.getConsumerPersistentId()
328                        + ", destination=" + handle.getDestination() 
329                        + ", id=" + handle.getMessageId() + "])");
330         }
331 
332         try {
333             // get the message id
334             String id = handle.getMessageId();
335 
336             // map the destination name to an actual identity
337             long destinationId = _destinations.getId(
338                 handle.getDestination().getName());
339             if (destinationId == 0) {
340                 throw new PersistenceException(
341                     "Cannot update message handle id=" +
342                     handle.getMessageId() + " for destination=" +
343                     handle.getDestination().getName() + " and consumer=" +
344                     handle.getConsumerPersistentId() +
345                     " since the destination cannot be mapped to an id");
346             }
347 
348             // map the consumer name to an identity
349             long consumerId = _consumers.getConsumerId(
350                 handle.getConsumerPersistentId());
351             if (consumerId == 0) {
352                 throw new PersistenceException(
353                     "Cannot update message handle id=" +
354                     handle.getMessageId() + " for destination=" +
355                     handle.getDestination().getName() + " and consumer=" +
356                     handle.getConsumerPersistentId() +
357                     " since the consumer cannot be mapped to an id");
358             }
359 
360             update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT);
361             update.setInt(1, handle.getDelivered() ? 1 : 0);
362             update.setString(2, id);
363             update.setLong(3, destinationId);
364             update.setLong(4, consumerId);
365 
366             // execute the delete
367             if (update.executeUpdate() != 1 && !handle.hasExpired()) {
368                 // only log if the message hasn't been garbage collected
369                 _log.error(
370                     "Failed to execute updateMessageHandle for handle=" +
371                     id + ", destination id=" + destinationId +
372                     ", consumer id=" + consumerId);
373             }
374         } catch (SQLException exception) {
375             throw new PersistenceException("Failed to update message handle=" +
376                 handle, exception);
377         } finally {
378             SQLHelper.close(update);
379         }
380     }
381 
382     /***
383      * Remove all the message handles associated with the specified destination
384      *
385      * @param connection - the connection to use
386      * @param  destination the name of the destination
387      * @throws  PersistenceException - sql releated exception
388      */
389     public void removeMessageHandles(Connection connection, String destination)
390         throws PersistenceException {
391 
392         PreparedStatement delete = null;
393 
394         try {
395             // map the destination name to an actual identity
396             long destinationId = _destinations.getId(destination);
397             if (destinationId == 0) {
398                 throw new PersistenceException(
399                     "Cannot remove message handles for destination=" +
400                     destination + " since the destination cannot be " +
401                     "mapped to an id");
402             }
403 
404             delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST);
405             delete.setLong(1, destinationId);
406             delete.executeUpdate();
407         } catch (SQLException exception) {
408             throw new PersistenceException(
409                 "Failed to remove message handles for destination=" +
410                 destination, exception);
411         } finally {
412             SQLHelper.close(delete);
413         }
414     }
415 
416     /***
417      * Remove all the message handles for the specified messageid
418      *
419      * @param connection - the connection to use
420      * @param messageId the message identity
421      * @throws  PersistenceException - sql releated exception
422      */
423     public void removeMessageHandles(Connection connection, long messageId)
424         throws PersistenceException {
425 
426         PreparedStatement delete = null;
427 
428         try {
429             delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT);
430             delete.setLong(1, messageId);
431             delete.executeUpdate();
432         } catch (SQLException exception) {
433             throw new PersistenceException(
434                 "Failed to remove message handles for message id=" + messageId,
435                 exception);
436         } finally {
437             SQLHelper.close(delete);
438         }
439     }
440 
441     /***
442      * Retrieve the message handle for the specified desitation and consumer
443      * name
444      *
445      * @param connection - the connection to use
446      * @param destination - destination name
447      * @param name - consumer name
448      * @return Vector - collection of MessageHandle objects
449      * @throws  PersistenceException - sql releated exception
450      */
451     public Vector getMessageHandles(Connection connection, String destination,
452                                     String name)
453         throws PersistenceException {
454 
455         Vector result = new Vector();
456         PreparedStatement select = null;
457         ResultSet set = null;
458 
459         // if the consumer and/or destination cannot be mapped then
460         // return an empty vector
461         long destinationId = _destinations.getId(destination);
462         long consumerId = _consumers.getConsumerId(name);
463         if ((consumerId == 0) ||
464             (destinationId == 0)) {
465             return result;
466         }
467 
468         // all preprequisites have been met so continue processing the
469         // request.
470         try {
471             select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST);
472             select.setLong(1, consumerId);
473 
474             // iterate through the result set and construct the corresponding
475             // MessageHandles
476             set = select.executeQuery();
477             while (set.next()) {
478                 // Attempt to retrieve the corresponding destination
479                 JmsDestination dest = _destinations.get(set.getLong(2));
480                 if (dest == null) {
481                     throw new PersistenceException(
482                         "Cannot create persistent handle, because " +
483                         "destination mapping failed for " + set.getLong(2));
484                 }
485 
486                 String consumer = _consumers.getConsumerName(set.getLong(3));
487                 if (name == null) {
488                     throw new PersistenceException(
489                         "Cannot create persistent handle because " +
490                         "consumer mapping failed for " + set.getLong(3));
491                 }
492 
493                 String messageId = set.getString(1);
494                 int priority = set.getInt(4);
495                 long acceptedTime = set.getLong(5);
496                 long sequenceNumber = set.getLong(6);
497                 long expiryTime = set.getLong(7);
498                 boolean delivered = (set.getInt(8) == 0) ? false : true;
499                 MessageHandle handle = new PersistentMessageHandle(
500                         messageId, priority, acceptedTime, sequenceNumber,
501                         expiryTime, dest, consumer);
502                 handle.setDelivered(delivered);
503                 result.add(handle);
504             }
505         } catch (SQLException exception) {
506             throw new PersistenceException(
507                 "Failed to get message handles for destination=" +
508                 destination + ", consumer=" + name, exception);
509         } finally {
510             SQLHelper.close(set);
511             SQLHelper.close(select);
512         }
513 
514         return result;
515     }
516 
517     /***
518      * Retrieve a distint list of message ids, in this table, between the min
519      * and max times inclusive.
520      *
521      * @param connection - the connection to use
522      * @param min - the minimum time in milliseconds
523      * @param max - the maximum time in milliseconds
524      * @return Vector - collection of String objects
525      * @throws  PersistenceException - sql related exception
526      */
527     public Vector getMessageIds(Connection connection, long min, long max)
528         throws PersistenceException {
529 
530         Vector result = new Vector();
531         PreparedStatement select = null;
532         ResultSet set = null;
533 
534         try {
535             select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE);
536             select.setLong(1, min);
537             select.setLong(2, max);
538 
539             // iterate through the result set and construct the corresponding
540             // MessageHandles
541             set = select.executeQuery();
542             while (set.next()) {
543                 result.add(set.getString(1));
544             }
545 
546            
547         } catch (SQLException exception) {
548             throw new PersistenceException("Failed to retrieve message ids",
549                 exception);
550         } finally {
551             SQLHelper.close(set);
552             SQLHelper.close(select);
553         }
554 
555         return result;
556     }
557 
558     /***
559      * Check if a message with the specified messageId exists in the
560      * table
561      *
562      * @param connection - the connection to use
563      * @param messageId the  message Identifier
564      * @return Vector - collection of MessageHandle objects
565      * @throws  PersistenceException - sql releated exception
566      */
567     public boolean messageExists(Connection connection, long messageId)
568         throws PersistenceException {
569 
570         boolean result = false;
571         PreparedStatement select = null;
572         ResultSet set = null;
573 
574         try {
575             select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID);
576             select.setLong(1, messageId);
577             set = select.executeQuery();
578 
579             if (set.next()) {
580                 result = true;
581             }
582             
583         } catch (SQLException exception) {
584             throw new PersistenceException(
585                 "Failed to determine if message exists, id=" + messageId,
586                 exception);
587         } finally {
588             SQLHelper.close(set);
589             SQLHelper.close(select);
590         }
591         return result;
592     }
593 
594     /***
595      * Returns the number of messages for the specified destination and
596      * consumer
597      *
598      * @param connection - the connection to use
599      * @param destination - destination name
600      * @param name - consumer name
601      * @return Vector - collection of MessageHandle objects
602      * @throws  PersistenceException - sql releated exception
603      */
604     public int getMessageCount(Connection connection, String destination,
605                                String name)
606         throws PersistenceException {
607 
608         int result = -1;
609         boolean destinationIsWildCard = false;
610 
611         // map the destination name to an actual identity
612         long destinationId = _destinations.getId(destination);
613         if (destinationId == 0) {
614             if (JmsTopic.isWildCard(destination)) {
615                 destinationIsWildCard = true;
616             } else {
617                 throw new PersistenceException(
618                     "Cannot get message handle count for destination=" +
619                     destination + " and consumer=" + name +
620                     " since the destination cannot be mapped to an id");
621             }
622         }
623 
624         // map the consumer name to an identity
625         long consumerId = _consumers.getConsumerId(name);
626         if (consumerId == 0) {
627             throw new PersistenceException(
628                 "Cannot get message handle count for destination=" +
629                 destination + " and consumer=" + name +
630                 " since the consumer cannot be mapped to an id");
631         }
632 
633         PreparedStatement select = null;
634         ResultSet set = null;
635 
636         try {
637             if (destinationIsWildCard) {
638                 select = connection.prepareStatement(
639                     GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER);
640                 select.setLong(1, destinationId);
641                 select.setLong(2, consumerId);
642             } else {
643                 select = connection.prepareStatement(
644                     GET_MSG_HANDLE_COUNT_FOR_CONSUMER);
645                 select.setLong(1, consumerId);
646             }
647 
648             set = select.executeQuery();
649             if (set.next()) {
650                 result = set.getInt(1);
651             }
652         } catch (SQLException exception) {
653             throw new PersistenceException(
654                 "Failed to count messages for destination=" + destination +
655                 ", consumer=" + name, exception);
656         } finally {
657             SQLHelper.close(set);
658             SQLHelper.close(select);
659         }
660 
661         return result;
662     }
663 
664     /***
665      * Remove all expired handles for the specified consumer
666      *
667      * @param connection - the connection to use
668      * @param consumer - consumer name
669      * @throws  PersistenceException - sql releated exception
670      */
671     public void removeExpiredMessageHandles(Connection connection,
672                                             String consumer)
673         throws PersistenceException {
674 
675         PreparedStatement delete = null;
676 
677         // map the consumer name ot an identity
678         long consumerId = _consumers.getConsumerId(consumer);
679         if (consumerId != 0) {
680             try {
681                 delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES);
682                 delete.setLong(1, consumerId);
683                 delete.setLong(2, System.currentTimeMillis());
684                 delete.executeUpdate();
685             } catch (SQLException exception) {
686                 throw new PersistenceException(
687                     "Failed to remove expired message handles",
688                     exception);
689             } finally {
690                 SQLHelper.close(delete);
691             }
692         }
693     }
694 
695 }